-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a wait_for_samples
method to the MovingWindow
#1159
Add a wait_for_samples
method to the MovingWindow
#1159
Conversation
@@ -318,6 +320,34 @@ def window( | |||
start, end, force_copy=force_copy, fill_value=fill_value | |||
) | |||
|
|||
async def wait_for_samples(self, n: int) -> None: | |||
"""Wait until the next `n` samples are available in the MovingWindow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is valid samples? If so, I am actually not sure if we want this or something time-based i.e. allow that not all samples are valid. However, for our current use-case this also works since we would set n=1 anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's interesting. valid means that any data was received. If a component is missing data, resampler will send None
and that is not a valid value.
If a component is sending only None
, should this function return after n
None
s are received? I'm guessing it should?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, in many scenarios I wouldn't distinguish between missing or None values. I think it shouldn't return after n Nones but after n
new time steps which have at least 1 real value. However, we could leave this also for later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated it to return after n
samples are received. Whether they were valid or not needs to be checked with a call to count_valid
. I've also updated the docs to state this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see the confusion, I understood that it triggers when n
new output samples have been "received", i.e. there are timestamps in the resulting moving window. But this is about input samples. So even if we receive 100 samples, if these are all older than newest timestamp we wouldn't get any new timestamp in the window but updated data points of older timestamps.
This makes sense to me, would stress that in the doc though, e.g the valid samples part is confusing IMO since this is indeed about the new samples.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe:
"""Wait until the next `n` samples have been received in the MovingWindow.
This function returns after `n` input samples have been received, without considering
whether the received samples are valid or which timestamp they have. The validity of
the samples in the updated moving window can be verified by calling the
[`count_valid`][frequenz.sdk.timeseries.MovingWindow.count_valid] method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it would wait until there are n
output samples, but some output samples could be nan.
It does consider the timestamps when the samples are received. It expects n
"new" samples to be available in the buffer before it returns.
The current tests only cover cases where there is no resampling in the moving window. I'll rectify that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to clarify that it returns after n
new samples in the moving window, and not just input samples, as discussed.
a001bf7
to
f94c4fa
Compare
@@ -318,6 +320,34 @@ def window( | |||
start, end, force_copy=force_copy, fill_value=fill_value | |||
) | |||
|
|||
async def wait_for_samples(self, n: int) -> None: | |||
"""Wait until the next `n` samples are available in the MovingWindow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see the confusion, I understood that it triggers when n
new output samples have been "received", i.e. there are timestamps in the resulting moving window. But this is about input samples. So even if we receive 100 samples, if these are all older than newest timestamp we wouldn't get any new timestamp in the window but updated data points of older timestamps.
This makes sense to me, would stress that in the doc though, e.g the valid samples part is confusing IMO since this is indeed about the new samples.
@@ -318,6 +320,34 @@ def window( | |||
start, end, force_copy=force_copy, fill_value=fill_value | |||
) | |||
|
|||
async def wait_for_samples(self, n: int) -> None: | |||
"""Wait until the next `n` samples are available in the MovingWindow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe:
"""Wait until the next `n` samples have been received in the MovingWindow.
This function returns after `n` input samples have been received, without considering
whether the received samples are valid or which timestamp they have. The validity of
the samples in the updated moving window can be verified by calling the
[`count_valid`][frequenz.sdk.timeseries.MovingWindow.count_valid] method.
raise ValueError( | ||
"The number of samples to wait for must be less than or equal to the " | ||
+ f"capacity of the MovingWindow ({self.capacity})." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could also just silently wait for self.capacity
instead, like slicing when you do [:10]
for an array that has less than 10 items. It can be more confusing if you wanted to wait for 10 and got 5 instead but it is something at least familiar in Python. Again maybe @cwasicki can hint which approach would be more intuitive for data scientists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought of it that if n > capacity the n would still be respected, but calculated in terms of time steps since this was triggered. But I think the different understanding here shows already that this could be confusing and we can limit it until we have a case where we need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, true, best to start with a safe approach, for my behaviour you can still easily get is via mw.wait_for_samples(min(n, mw.capacity))
I guess, so it doesn't look like it is adding too much value.
async with self._condition_new_sample: | ||
# Every time a new sample is received, this condition gets notified and | ||
# will wake up. | ||
_ = await self._condition_new_sample.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just thinking out loud, and I don't even think we need to think about it for this PR, but maybe this could be done more efficiently by reversing the logic, and only set the condition when the counter is set (it could be even be a simple Event
in this case). This would mean we would need to save the "waiters" in the instance, and then notify the appropriate waiter, so maybe that's a bit costly too, but I guess the most common case will be having only one or very few waiters. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most common case is also with n=1 and resampling_interval=15 minutes.
f5cb345
to
2155dca
Compare
It retains the original behaviour of counting all the valid samples in the buffer when no time range is specified. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
It retains the original behaviour of counting all the valid samples in the buffer when no time range is specified. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
The resampler sends `None` values when there is no source data. In this case, the values need to be sent to the buffer, so that it can update its gaps immediately, instead of waiting until when data is finally available. Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
2155dca
to
a2754f4
Compare
Also added tests for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just minor clarification proposal.
async def wait_for_samples(self, n: int) -> None: | ||
"""Wait until the next `n` samples are available in the MovingWindow. | ||
|
||
This function returns after `n` samples are available in the MovingWindow, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would say after n new samples are available
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
Signed-off-by: Sahas Subramanian <sahas.subramanian@proton.me>
a2754f4
to
32e27cb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I disabled auto-merge to give @cwasicki the opportunity to approve too, as he knows what's needed best.
He had already approved. |
Closes #967